<?php

/*
 * Copyright (C) 2009 - 2011 Pham Cong Dinh
 *
 * This file is part of Spica.
 *
 * This is free software; you can redistribute it and/or modify it
 * under the terms of the GNU Lesser General Public License as
 * published by the Free Software Foundation; either version 3 of
 * the License, or (at your option) any later version.
 *
 * This software is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this software; if not, write to the Free
 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
 */

/**
 * SpicaRedisClient is a client class for the Redis protocol.
 * It can be used to interact with another Redis server from within a PHP script
 *
 * namespace spica\core\datasource\redis\RedisClient;
 *
 * @category   spica
 * @package    core
 * @subpackage datasource\redis
 * @author     Pham Cong Dinh <pcdinh at phpvietnam dot net>
 * @since      Version 0.3
 * @since      December 21, 2009
 * @copyright  Pham Cong Dinh (http://www.phpvietnam.net)
 * @license    http://www.gnu.org/licenses/lgpl-3.0.txt
 * @version    $Id: Redis.php 1869 2011-01-07 18:55:25Z pcdinh $
 */
class SpicaRedisClient
{
    /**
     * Connection pool.
     *
     * @var array
     */
    private $_pool = array();

    /**
     * Constructs an object of <code>SpicaRedisClient</code>
     */
    public function __construct() {}

    /**
     * Gets Redis database connection.
     *
     * @param  SpicaRedisContext $context
     * @return SpicaRedisDatabaseConnection
     */
    public function getConnection($context)
    {
        $conn = new SpicaRedisDatabaseConnection($context);
        $this->_pool[] = $conn;
        return $conn;
    }

    /**
     * Gets current active database index in Redis.
     *
     * @return int|false False is returned when there is no connection in the pool
     */
    public function getCurrentDatabase()
    {
        $len  = count($this->_pool);

        if (0 === $len)
        {
            return false;
        }

        $conn = $this->_pool[count($this->_pool) - 1];
        return $conn->getCurrentDatabase();
    }

    /**
     * Disconnects from Redis servers.
     */
    public function disconnect()
    {
        foreach ($this->_pool as $conn)
        {
            $c = $conn->getconnection();
            if (true === $c->isConnected())
            {
                $c->close();
            }
        }
    }
}

/**
 * SpicaRedisClusterAwareClient is a client class for the Redis protocol.
 * It can be used to interact with multiple Redis servers from within a PHP script
 *
 * @category   spica
 * @package    core
 * @subpackage datasource\redis
 * @author     Pham Cong Dinh <pcdinh at phpvietnam dot net>
 * @since      Version 0.3
 * @since      December 22, 2009
 * @copyright  Pham Cong Dinh (http://www.phpvietnam.net)
 * @license    http://www.gnu.org/licenses/lgpl-3.0.txt
 * @version    $Id: Redis.php 1869 2011-01-07 18:55:25Z pcdinh $
 */
class SpicaRedisClusterAwareClient
{

}

/**
 * Represents a Redis connection context including host, port, transport protocol,
 * Redis database and other details on how to create a socket connection.
 *
 * namespace spica\core\datasource\redis\RedisContext;
 *
 * @category   spica
 * @package    core
 * @subpackage datasource\redis
 * @author     Pham Cong Dinh <pcdinh at phpvietnam dot net>
 * @since      Version 0.3
 * @since      December 22, 2009
 * @copyright  Pham Cong Dinh (http://www.phpvietnam.net)
 * @license    http://www.gnu.org/licenses/lgpl-3.0.txt
 * @version    $Id: Redis.php 1869 2011-01-07 18:55:25Z pcdinh $
 */
class SpicaRedisContext
{
    /**
     * Redis database index.
     *
     * @var int
     */
    public $db = 0;

    /**
     * Redis host.
     *
     * @var string
     */
    public $host;

    /**
     * Redis port.
     *
     * @var int
     */
    public $port = 6379;

    /**
     * Socket connection establishing timeout.
     *
     * @var int
     */
    public $timeout = 3;

    /**
     * Data sending timeout.
     *
     * @var int
     */
    public $sendTimeout = 10;

    /**
     * Redis connection transport (tcp:// or udp://)
     *
     * @var string
     */
    public $transport = 'tcp://';

    /**
     * Connection stream context configuration.
     *
     * @var array
     */
    public $opt;

    /**
     * Constructs an object of <code>SpicaRedisContext</code>.
     *
     * @param array  $hostInfo An array of data relating to host, including the following key:
     *                         host, port (optional), connect_timeout (optional), send_timeout (optional), transport (optional)
     * @param int    $db  Database index (optional)
     * @param array  $opt Network stream parameters (optional)
     */
    public function __construct($hostInfo, $db = 0, $opt = null)
    {
        if (null !== $db)
        {
            $this->db = $db;
        }

        if (true === isset($hostInfo['host']))
        {
            $this->host = $hostInfo['host'];
        }

        if (true === isset($hostInfo['port']))
        {
            $this->port = $hostInfo['port'];
        }

        if (true === isset($hostInfo['connect_timeout']))
        {
            $this->timeout = $hostInfo['connect_timeout'];
        }

        if (true === isset($hostInfo['send_timeout']))
        {
            $this->sendTimeout = $hostInfo['send_timeout'];
        }

        if (true === isset($hostInfo['transport']))
        {
            $this->transport = $hostInfo['transport'];
        }

        if (null !== $opt)
        {
            $this->opt = $opt;
        }
    }
}

/**
 * Represents a connection to a specific database index in a Redis server.
 *
 * namespace spica\core\datasource\redis\RedisDatabaseConnection;
 *
 * @category   spica
 * @package    core
 * @subpackage datasource\redis
 * @author     Pham Cong Dinh <pcdinh at phpvietnam dot net>
 * @since      Version 0.3
 * @since      December 22, 2009
 * @copyright  Pham Cong Dinh (http://www.phpvietnam.net)
 * @license    http://www.gnu.org/licenses/lgpl-3.0.txt
 * @version    $Id: Redis.php 1869 2011-01-07 18:55:25Z pcdinh $
 */
class SpicaRedisDatabaseConnection
{
    /**
     * Redis socket connection.
     *
     * @var SpicaRedisNetConnection
     */
    private $_conn;

    /**
     * Redis database connection context.
     *
     * @var SpicaRedisContext
     */
    private $_context = 0;

    /**
     * If a Redis database is selected?
     *
     * @var bool
     */
    private $_selected = false;

    /**
     * Constructs an object of <code>SpicaRedisDatabaseConnection</code>.
     *
     * @param SpicaRedisContext $context
     */
    public function __construct($context)
    {
        $this->_context = $context;
    }

    /**
     * Sets Redis socket connection.
     *
     * @param SpicaRedisNetConnection $conn
     */
    public function setConnection($conn)
    {
        if (true === $this->_conn->isConnected())
        {
            $this->_conn->close();
        }

        $this->_selected = false;
        $this->_conn = $conn;
    }

    /**
     * Sends a command to Redis server.
     *
     * @throws Exception if something wrong happens
     * @param  SpicaRedisCommand $cmd
     * @return SpicaRedisResponse
     */
    public function send($cmd)
    {
        if (null === $this->_conn)
        {
            $this->_conn = new SpicaRedisConnection($this->_context->transport, $this->_context->host, $this->_context->port, $this->_context->timeout, $this->_context->opt);
        }

        if (false === $this->_conn->isConnected())
        {
            $this->_conn->open();
        }

        if (false === $this->_selected && 0 !== $this->_context->db)
        {
            $bytes = $this->_conn->send('SELECT '.$this->_context->db.SpicaRedisCommand::NL);

            if ($bytes < 1)
            {
                throw new Exception('Unable to send SELECT database command to Redis server.');
            }

            $reply = new SpicaRedisResponse($this->_conn->getSocket());
            // Status code reply
            if (SpicaRedisResponse::OK !== $reply->getType())
            {
                throw new Exception('Unable to use database index: '.$this->_context->db.'.');
            }

            $this->_selected = true;
        }

        $this->_conn->send($cmd->toString());
        return new SpicaRedisResponse($this->_conn->getSocket());
    }

    /**
     * Gets Redis connection.
     *
     * @return SpicaRedisNetConnection
     */
    public function getConnection()
    {
        return $this->_conn;
    }

    /**
     * Gets connection context.
     *
     * @return SpicaRedisContext
     */
    public function getContext()
    {
        return $this->_context;
    }

    /**
     * Gets current database index.
     *
     * @return int
     */
    public function getCurrentDatabase()
    {
        return $this->_context->db;
    }

    /**
     * Closes socket connection to Redis server.
     */
    public function close()
    {
        $this->_selected = false;
        $this->_conn->close();
    }
}

/**
 * Represents a Redis command.
 *
 * namespace spica\core\datasource\redis\RedisCommand;
 *
 * @category   spica
 * @package    core
 * @subpackage datasource\redis
 * @author     Pham Cong Dinh <pcdinh at phpvietnam dot net>
 * @since      Version 0.3
 * @since      December 21, 2009
 * @copyright  Pham Cong Dinh (http://www.phpvietnam.net)
 * @license    http://www.gnu.org/licenses/lgpl-3.0.txt
 * @version    $Id: Redis.php 1869 2011-01-07 18:55:25Z pcdinh $
 */
interface SpicaRedisCommand
{
    /**
     * Redis separator.
     *
     * @var string
     */
    const NL = "\r\n";

    /**
     * Gets the command string.
     *
     * @return string
     */
    public function toString();

    /**
     * Is this command subject to hashing?
     *
     * @return bool
     */
    public function supportsHash();
}

/**
 * Represents Redis's single command required no any parameter.
 *
 * namespace spica\core\datasource\redis\cmd\NoParamCommand;
 *
 * @category   spica
 * @package    core
 * @subpackage datasource\redis
 * @author     Pham Cong Dinh <pcdinh at phpvietnam dot net>
 * @since      Version 0.3
 * @since      December 26, 2009
 * @copyright  Pham Cong Dinh (http://www.phpvietnam.net)
 * @license    http://www.gnu.org/licenses/lgpl-3.0.txt
 * @version    $Id: Redis.php 1869 2011-01-07 18:55:25Z pcdinh $
 */
class SpicaRedisNoParamCommand implements SpicaRedisCommand
{
    /**
     * The provided Redis command.
     *
     * @var string
     */
    protected $_cmd;

    /**
     * Constructs an object of <code>SpicaRedisNoParamCommand</code> which represents
     * a single Redis command required no any parameter.
     *
     * @param string $cmd
     */
    public function __construct($cmd)
    {
        $this->_cmd = $cmd;
    }

    /**
     * Sets Redis command.
     *
     * @param string $cmd
     */
    public function setCommand($cmd)
    {
        $this->_cmd = $cmd;
    }

    /**
     * Gets the command string.
     *
     * @return string
     */
    public function toString()
    {
        return $this->_cmd.SpicaRedisCommand::NL;
    }

    /**
     * Is this command subject to hashing?
     *
     * @return bool
     */
    public function supportsHash()
    {
        return false;
    }
}

/**
 * The SpicaRedisNetConnection object provides an interface for UDP or TCP based communication.
 *
 * namespace spica\core\datasource\redis\RedisNetConnection;
 *
 * @category   spica
 * @package    core
 * @subpackage datasource\redis
 * @author     Pham Cong Dinh <pcdinh at phpvietnam dot net>
 * @since      Version 0.3
 * @since      December 22, 2009
 * @copyright  Pham Cong Dinh (http://www.phpvietnam.net)
 * @license    http://www.gnu.org/licenses/lgpl-3.0.txt
 * @version    $Id: Redis.php 1869 2011-01-07 18:55:25Z pcdinh $
 */
interface SpicaRedisNetConnection
{
    /**
     * Establishes a connection.
     */
    public function open();

    /**
     * Sends data over an open connection.
     *
     * @param string $data
     */
    public function send($data);

    /**
     * Receives data over an open TCP connection.
     * Receiving data over an open TCP connection is a non-blocking operation
     *
     * @param int $length
     * @return string|false Raw content from the socket
     */
    public function receive($length = 1024);

    /**
     * Gets connection socket.
     *
     * @return resource
     */
    public function getSocket();

    /**
     * Checks if a connection is established.
     *
     * @return bool
     */
    public function isConnected();

    /**
     * Closes the TCP connection.
     */
    public function close();

    /**
     * Gets error number.
     *
     * @return int
     */
    public function getErrorNo();

    /**
     * Gets error message.
     *
     * @return string
     */
    public function getErrorMessage();
}

/**
 * A SpicaRedisConnection provides simplified access to a remote Redis server
 * through a TCP connection.
 *
 * namespace spica\core\datasource\redis\RedisConnection;
 *
 * @category   spica
 * @package    core
 * @subpackage datasource\redis
 * @author     Pham Cong Dinh <pcdinh at phpvietnam dot net>
 * @since      Version 0.3
 * @since      December 21, 2009
 * @copyright  Pham Cong Dinh (http://www.phpvietnam.net)
 * @license    http://www.gnu.org/licenses/lgpl-3.0.txt
 * @version    $Id: Redis.php 1869 2011-01-07 18:55:25Z pcdinh $
 */
class SpicaRedisConnection implements SpicaRedisNetConnection
{
    /**
     * Host address.
     *
     * @var string
     */
    private $_host;

    /**
     * Port number.
     *
     * @var int
     */
    private $_port;

    /**
     * Connection transport.
     *
     * @var string
     */
    private $_transport;

    /**
     * Connection timeout.
     *
     * @var int Time-out value in second
     */
    private $_timeout = 4;

    /**
     * Specifies the amount of time after which a synchronous <code>send()</code> call will time out.
     *
     * @var int Time-out value in second
     */
    private $_sendTimeout = 10;

    /**
     * Socket send buffer.
     *
     * @var int Number of bytes. Defaults to 0 means write operations are unbuffered
     */
    private $_sendBuffer = 0;

    /**
     * Socket receive buffer.
     *
     * @var int Number of bytes
     */
    private $_receiveBufferSize = 1024;

    /**
     * Whether the socket is blocking. Defaults to true.
     *
     * @var bool
     */
    private $_blocking = true;

    /**
     * Stream context configuration.
     *
     * @var array
     */
    private $_opt;

    /**
     * Is connection established?
     *
     * @var bool
     */
    private $_established = false;

    /**
     * Error number.
     *
     * @var int
     */
    private $_errorNo;

    /**
     * Error message.
     *
     * @var string
     */
    private $_errorMessage;

    /**
     * Socket connection stream.
     *
     * @var resource A resource of type "stream"
     */
    private $_socket;

    /**
     * Constructs an object of <code>SpicaRedisConnection</code>.
     *
     * @param string $transport Connection transport protocol
     * @param string $host Server IP
     * @param int    $port Server port
     * @param int    $connectTimeout Timeout in seconds
     * @param int    $sendTimeout Timeout in seconds
     * @param array  $opt Stream context configuration
     */
    public function __construct($transport, $host, $port, $connectTimeout = null, $sendTimeout = null, $opt = null)
    {
        $this->_transport = $transport;
        $this->_host      = $host;
        $this->_port      = $port;

        if (null !== $connectTimeout)
        {
            $this->_timeout = $connectTimeout;
        }

        if (null !== $sendTimeout)
        {
            $this->_sendTimeout = $sendTimeout;
        }

        if (true === is_array($opt))
        {
            $this->_opt = $opt;
        }
    }

    /**
     * Sets a value that specifies the amount of time after which a synchronous <code>send()</code>
     * call will time out.
     *
     * @param int $buffer Number of Kb
     */
    public function setSendBuffer($buffer = 0)
    {
        $this->_sendBuffer = $buffer;
    }

    /**
     * Find out if the socket is in blocking mode.
     *
     * @return bool  The current blocking mode.
     */
    public function isBlocking()
    {
        return $this->_blocking;
    }

    /**
     * Sets whether the socket connection should be blocking or
     * not. A read call to a non-blocking socket will return immediately
     * if there is no data available, whereas it will block until there
     * is data for blocking sockets.
     *
     * @param bool $mode  True for blocking sockets, false for nonblocking.
     */
    public function setBlocking($mode)
    {
        $this->_blocking = $mode;
    }

    /**
     * Openes the TCP connection. Opening the TCP connection will
     * attempt to establish a connection to the specified host and port.
     *
     * @return bool true if the connection was successfully established,
     *              false if the connection was refused or the host not found
     */
    public function open()
    {
        // Attempts to close previously opened connection
        $this->close();
        $this->_socket = stream_socket_client($this->_transport.$this->_host.':'.$this->_port, $this->_errorNo, $this->_errorMessage, $this->_timeout, STREAM_CLIENT_CONNECT);

        if (false !== $this->_socket)
        {
            if (null !== $this->_opt)
            {
                stream_context_set_params($this->_socket, $this->_opt);
            }

            stream_set_blocking($this->_socket, $this->_blocking);
            stream_set_timeout($this->_socket, $this->_sendTimeout);
            stream_set_write_buffer($this->_socket, $this->_sendBuffer);
            $this->_established = true;
            return true;
        }

        return false;
    }

    /**
     * Sends data over an open TCP connection.
     *
     * @param  string $out
     * @return int|false Number of bytes sent to socket
     */
    public function send($out)
    {
        if (false === $this->_established)
        {
            throw new BadMethodCallException('Connection is not established.');
        }

        // Output using fwrite() is normally buffered at 8K
        return fwrite($this->_socket, $out);
    }

    /**
     * Receives a single line of data over an open TCP connection.
     * Receiving data over an open TCP connection is a non-blocking operation
     *
     * @param int $length
     * @return string|false Raw content from the socket
     */
    public function readLine($length = 1024)
    {
        return fgets($this->_socket, $length);
    }

    /**
     * Reads a specified amount of data over an open TCP connection. This is guaranteed to return,
     * and has the added benefit of getting everything in one fread()
     * chunk; if you know the size of the data you're getting
     * beforehand, this is definitely the way to go.
     *
     * @param  int $length The number of bytes to read from the socket.
     * @return bytes of data from the socket or false
     */
    public function receive($length = 1024)
    {
        return fread($this->_socket, $length);
    }

    /**
     * Read until the socket closes, or until there is no more data in
     * the inner PHP buffer. If the inner buffer is empty, in blocking
     * mode we wait for at least 1 byte of data. Therefore, in
     * blocking mode, if there is no data at all to be read, this
     * function will never exit (unless the socket is closed on the
     * remote end).
     *
     * @return string  All data until the socket closes, or false
     */
    public function receiveAll()
    {
        $data = '';

        while (!feof($this->_socket))
        {
            $data .= fread($this->_socket, $this->_receiveBufferSize);
        }

        return $data;
    }

    /**
     * Writes a line of data to the socket, followed by a trailing "\r\n".
     *
     * @param  string $out
     * @return int|false Bytes to send out or false
     */
    public function writeLine($out)
    {
        return fwrite($this->_socket, $out."\r\n");
    }

    /**
     * Gets stream-aware connection socket.
     *
     * @return resource
     */
    public function getSocket()
    {
        if (false === $this->_established)
        {
            $this->open();
        }

        return $this->_socket;
    }

    /**
     * Tests whether the TCP connection is still connected.
     *
     * @return bool
     */
    public function isConnected()
    {
        return $this->_established;
    }

    /**
     * Closes the TCP connection.
     */
    public function close()
    {
        if (true === $this->_established)
        {
            fclose($this->_socket);
            $this->_errorNo      = null;
            $this->_errorMessage = null;
            $this->_established  = false;
        }
    }

    /**
     * Gets error number.
     *
     * @return int
     */
    public function getErrorNo()
    {
        return $this->_errorNo;
    }

    /**
     * Gets error message.
     *
     * @return string
     */
    public function getErrorMessage()
    {
        return $this->_errorMessage;
    }

    /**
     * Gets Redis server port in use.
     *
     * @return int
     */
    public function getPort()
    {
        return $this->_port;
    }

    /**
     * Gets Redis server IP in use.
     *
     * @return string
     */
    public function getHost()
    {
        return $this->_host;
    }
}

/**
 * Encapsulates Redis response information from an Redis client operation.
 *
 * namespace spica\core\datasource\redis\RedisResponse;
 *
 * @category   spica
 * @package    core
 * @subpackage datasource\redis
 * @author     Pham Cong Dinh <pcdinh at phpvietnam dot net>
 * @since      Version 0.3
 * @since      December 21, 2009
 * @copyright  Pham Cong Dinh (http://www.phpvietnam.net)
 * @license    http://www.gnu.org/licenses/lgpl-3.0.txt
 * @version    $Id: Redis.php 1869 2011-01-07 18:55:25Z pcdinh $
 */
class SpicaRedisResponse
{
    /**
     * Represents an error reply (such as -ERR unknown command or -ERR wrong number of arguments for 'select' command)
     *
     * @var int
     */
    const ERROR = 0;

    /**
     * Indicates an integer reply.
     *
     * @var int
     */
    const INT = 1;

    /**
     * Indicates a status reply.
     *
     * @var int
     */
    const STATUS = 2;

    /**
     * Indicates a bulk reply.
     *
     * @var int
     */
    const BULK = 3;

    /**
     * Indicates a multi-bulk reply.
     *
     * @var int
     */
    const MULTIBULK = 4;

    /**
     * Indicates server is up and running.
     *
     * @see SpicaRedisResponse::STATUS
     * @var string
     */
    const STATUS_PONG = 'PONG';

    /**
     * Represents a status code reply that indicates a success response (+OK)
     *
     * @see SpicaRedisResponse::STATUS
     * @var string
     */
    const STATUS_OK = 'OK';

    /**
     * Represents an integer reply that indicates values or a value is/are set by the command (:1)
     *
     * @see SpicaRedisResponse::INT
     * @var string
     */
    const INT_SET = 1;

    /**
     * Represents an integer reply that indicates values or a value is/are not set by the command (:0)
     *
     * @see SpicaRedisResponse::INT
     * @var string
     */
    const INT_NOTSET = 0;

    /**
     * The connection socket.
     *
     * @var resource A resource of type "stream"
     */
    private $_socket;

    /**
     * Is reply type checked?
     *
     * @var bool
     */
    private $_headerChecked = false;

    /**
     * Is reply closed?
     *
     * @var bool
     */
    private $_closed = false;

    /**
     * Reply header.
     *
     * @var SpicaRedisReplyHeader
     */
    private $_header;

    /**
     * Constructs an object of <code>SpicaRedisResponse</code>.
     *
     * @param resource $socket
     */
    public function __construct($socket)
    {
        $this->_socket = $socket;
    }

    /**
     * Gets reply type.
     *
     * @throws Exception The reply from Redis server is closed and the stream pointer is moved to the end of socket stream to wait for the next reply
     * @return int
     */
    public function getType()
    {
        // Response can be closed before
        $this->_checkClosed();

        if (false === $this->_headerChecked)
        {
            $this->_process();
        }

        return $this->_header->type;
    }

    /**
     * Gets error message from an error reply.
     *
     * @throws Exception The reply from Redis server is closed and the stream pointer is moved to the end of socket stream to wait for the next reply
     * @return string
     */
    public function getError()
    {
        // Response can be closed before
        $this->_checkClosed();

        if (false === $this->_headerChecked)
        {
            $this->_process();
        }

        if (self::ERROR !== $this->_header->type)
        {
            throw new BadMethodCallException('Not an error reply: '.$this->_header->type);
        }

        return $this->_header->error;
    }

    /**
     * Gets the integer value parsed from an integer reply.
     *
     * @throws Exception The reply from Redis server is closed and the stream pointer is moved to the end of socket stream to wait for the next reply
     * @throws BadMethodCallException if not an error reply
     * @return int
     */
    public function getInt()
    {
        // Response can be closed before
        $this->_checkClosed();

        if (false === $this->_headerChecked)
        {
            $this->_process();
        }

        if (self::INT !== $this->_header->type)
        {
            throw new BadMethodCallException('Not an integer reply: '.$this->_header->type);
        }

        $this->_closed = true;
        return (int) $this->_header->length;
    }

    /**
     * Gets status code from a status code reply.
     *
     * @throws Exception The reply from Redis server is closed and the stream pointer is moved to the end of socket stream to wait for the next reply
     * @return string
     */
    public function getStatus()
    {
        // Response can be closed before
        $this->_checkClosed();

        if (false === $this->_headerChecked)
        {
            $this->_process();
        }

        if (self::STATUS !== $this->_header->type)
        {
            throw new BadMethodCallException('Not a status code reply: '.$this->_header->type);
        }

        $this->_closed = true;
        return $this->_header->status;
    }

    /**
     * Gets content of a bulk reply.
     *
     * @throws Exception The reply from Redis server is closed and the stream pointer is moved to the end of socket stream to wait for the next reply
     * @return string|null
     */
    public function getBulk()
    {
        // Response can be closed before
        $this->_checkClosed();

        if (false === $this->_headerChecked)
        {
            $this->_process();
        }

        if (self::BULK !== $this->_header->type)
        {
            throw new BadMethodCallException('Not a bulk reply: '.$this->_header->type);
        }

        $this->_closed = true;
        return $this->_getBulk($this->_socket, (int) $this->_header->length);
    }

    /**
     * Gets content of a multi-bulk reply.
     *
     * @throws Exception The reply from Redis server is closed and the stream pointer is moved to the end of socket stream to wait for the next reply
     * @return array|null Returns null if key does not exist
     */
    public function getMultiBulk()
    {
        $this->_checkClosed();
        if (false === $this->_headerChecked)
        {
            $this->_process();
        }

        if (self::MULTIBULK !== $this->_header->type)
        {
            throw new BadMethodCallException('Not a multi-bulk reply: '.$this->_header->type);
        }

        $len = (int) $this->_header->length;

        if (-1 === $len)
        {
            return null;
        }

        if (0 === $len)
        {
            return array();
        }

        $data = array();

        for ($i = 0; $i < $len; $i++)
        {
            $header = $this->_readReplyHeader($this->_socket);
            $data[] = $this->_getBulk($this->_socket, (int) $header->length);
        }

        $this->_headerChecked = true;
        $this->_closed = true;
        return $data;
    }

    /**
     * Gets parsed reply header.
     *
     * @return SpicaRedisReplyHeader
     */
    public function getHeader()
    {
        if (false === $this->_headerChecked)
        {
            $this->_process();
        }

        return $this->_header;
    }

    /**
     * Discards the current Redis reply and move the pointer to the end of socket descriptor.
     * This make the socket descriptor available for the next reply.
     *
     * @throws UnexpectedValueException if reply type is not recognized
     * @throws Exception The reply from Redis server is closed and the stream pointer is moved to the end of socket stream to wait for the next reply
     */
    public function close()
    {
        $this->_checkClosed();
        $header = $this->_readReplyHeader($this->_socket);

        switch ($header->type)
        {
            case self::BULK:
                $len = (int) $header->length;

                if ($len < 1)
                {
                    $len = 0;
                }

                fread($this->_socket, $len + 2);
                break;

            case self::MULTIBULK:

                $len = (int) $header->length;

                if ($len < 1)
                {
                    fread($this->_socket, 2);
                    break;
                }

                $meta = stream_get_meta_data($this->_socket);

                if ($meta['unread_bytes'] > 0)
                {
                    fread($this->_socket, $meta['unread_bytes']);
                }

                break;

            case self::ERROR:
                break;

            case self::STATUS:
                break;

            case self::INT:
                break;

            default:
                throw new UnexpectedValueException("Invalid reply type character: '$c'");
        }

        $this->_headerChecked = true;
        $this->_closed = true;
    }

    /**
     * Is reply header (meta data) checked?
     *
     * @return bool
     */
    public function isChecked()
    {
        return $this->_headerChecked;
    }

    /**
     * Is the reply closed?
     *
     * @see #close()
     * @return bool
     */
    public function isClosed()
    {
        return $this->_closed;
    }

    /**
     * Checks if the reply is closed that can cause by explicit closing by user or all the reply content
     * is retrieved.
     *
     * @throws Exception The reply from Redis server is closed and the stream pointer is moved to the end of socket stream to wait for the next reply
     */
    private function _checkClosed()
    {
        if (true === $this->_closed)
        {
            throw new Exception('Reply is closed.');
        }
    }

    /**
     * Parses Redis response from the socket.
     */
    private function _process()
    {
        if (true === $this->_headerChecked)
        {
            return;
        }

        $this->_header = $this->_readReplyHeader($this->_socket);
        $this->_headerChecked = true;
    }

    /**
     * Reads and parses reply header from the socket.
     *
     * @throws UnexpectedValueException
     * @param  resource $socket Socket stream
     * @return SpicaRedisReplyHeader
     */
    private function _readReplyHeader($socket)
    {
        $head = new SpicaRedisReplyHeader();
        // Check first character
        $c = fgetc($socket);

        switch ($c)
        {
            case '-': // Error reply such as -ERR unknown command
                $head->error = substr(trim(fgets($socket), SpicaRedisCommand::NL), 4); // ignore ERR and one white space
                $head->type  = self::ERROR;
                break;

            case '+': // Status code reply like +OK or +PONG
                $head->status = rtrim(fgets($socket), SpicaRedisCommand::NL);
                $head->type   = self::STATUS;
                break;

            case ':': // Integer reply which indicates something does not exist like :0
                $head->length = rtrim(fgets($socket), SpicaRedisCommand::NL);
                $head->type   = self::INT;
                break;

            case '$': // Bulk reply
                // Gets first line for data length
                $head->length = rtrim(fgets($socket), SpicaRedisCommand::NL);
                $head->type   = self::BULK;

                if (false === is_numeric($head->length))
                {
                    throw new UnexpectedValueException('Cannot parse '.$head->length.' as data length');
                }

                break;

            case '*': // Multi-bulk reply
                // Number of data items
                $head->length = trim(fgets($socket), SpicaRedisCommand::NL);
                $head->type   = self::MULTIBULK;

                if (false === is_numeric($head->length))
                {
                    throw new UnexpectedValueException('Cannot parse '.$head->length.' as data length');
                }

                break;

            default:
                throw new UnexpectedValueException("Invalid reply type byte: '$c'");
        }

        return $head;
    }

    /**
     * Gets content of a bulk reply from a provided socket and data length.
     *
     * @see    getBulk()
     * @see    getMultiBulk()
     * @param  resource $socket Connection socket
     * @param  int      $length Bytes to read
     * @return string|null
     */
    private function _getBulk($socket, $length)
    {
        if (-1 === $length)
        {
            return null;
        }

        if (0 === $length)
        {
            return '';
        }

        $buffer = stream_get_contents($socket, $length);
        fread($socket, 2); // move pointer forward up to 2 bytes (\r\n)
        return $buffer;
    }
}

/**
 * Represents Redis reply meta data.
 *
 * namespace spica\core\datasource\redis\RedisReplyHeader;
 *
 * @category   spica
 * @package    core
 * @subpackage datasource\redis
 * @author     Pham Cong Dinh <pcdinh at phpvietnam dot net>
 * @since      Version 0.3
 * @since      December 23, 2009
 * @copyright  Pham Cong Dinh (http://www.phpvietnam.net)
 * @license    http://www.gnu.org/licenses/lgpl-3.0.txt
 * @version    $Id: Redis.php 1869 2011-01-07 18:55:25Z pcdinh $
 */
class SpicaRedisReplyHeader
{
    /**
     * Reply type.
     *
     * @var int
     */
    public $type;

    /**
     * Status code for status reply type such as OK or PONG.
     *
     * @var string
     */
    public $status;

    /**
     * Data length for integer or bulk or multi-bulk reply.
     *
     * @var string
     */
    public $length;

    /**
     * Error message for error reply type.
     *
     * @var string
     */
    public $error;

    /**
     * Constructs an object of <code>SpicaRedisReplyHeader</code>.
     */
    public function __construct() {}
}

?>